package com.qyl.framework.client.impl;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.qyl.framework.base.RPCSwitch;
import com.qyl.framework.base.constant.ChannelConstantKey;
import com.qyl.framework.base.protocol.RequestProto;
import com.qyl.framework.base.value.BlockingRequestValue;
import com.qyl.framework.client.decoder.ResponseProtoDecoder;
import com.qyl.framework.client.encoder.RequestProtoEncoder;
import com.qyl.framework.handler.ResponseHandler;
import com.qyl.framework.server.impl.RPCServer;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.util.Attribute;

/**
 * 
 * @author created by 齐翌来（qiyilai@foxmail.com）
 * 
 * 2019年2月17日
 * 
 * @description
 *  RPC客户端
 */
public class RPCClient extends RPCSwitch {
	
	private String ip;

	private EventLoopGroup bossGroup;
	
	private Channel channel;
	
	private BlockingRequestValue requestValue;
	
	private static final int STRAT_TIME_OUT = 50000;
	
	private static final Logger logger = LoggerFactory.getLogger(RPCClient.class);
	
	@Override
	public void doStart() {
		EventLoopGroup bossGroup = new NioEventLoopGroup();
		
		try {
			Bootstrap clientBootstrap = new Bootstrap().group(bossGroup).channel(NioSocketChannel.class)
					.localAddress(port).handler(new ChannelInitializer<SocketChannel>() {
 
						@Override
						protected void initChannel(SocketChannel ch) throws Exception {
		                    ChannelPipeline pipeline = ch.pipeline();  
		                    pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
		                    pipeline.addLast(new ResponseProtoDecoder());
		                    pipeline.addLast(new ResponseHandler());
		                    pipeline.addLast(new LengthFieldPrepender(4));
		                    pipeline.addLast(new RequestProtoEncoder());    
						}
					}).option(ChannelOption.TCP_NODELAY, true);
			logger.info("clientBootstrap inited");
			ChannelFuture future = clientBootstrap.connect(ip, port).sync();
			boolean success = future.await(STRAT_TIME_OUT);
			
	        if (success) {
	        	this.channel = future.channel();
	 	        Attribute<BlockingRequestValue> attr = channel.attr(ChannelConstantKey.BLOCKING_REQUEST);
	 	        attr.set(requestValue);
	 	        this.bossGroup = bossGroup;
	    		logger.info("RPCClient started, success connetct address【{}:{}】", ip, port);
	        } else {
	        	logger.error("RPCClient start failure, can not connetct address【{}:{}】", ip, port);
	        	bossGroup.shutdownGracefully();
	        } 
		} catch (Exception e) {
			 bossGroup.shutdownGracefully();
			 logger.error("RPCClient can not connect {}:{}", ip, port, e);
		}
		
	}

	@Override
	protected void doStop() {
		if (bossGroup != null) {
			bossGroup.shutdownGracefully();
		}	
	}

	public RPCClient() {
		this.requestValue = new BlockingRequestValue();
	}
	
	public RPCClient(String ip, int port){
		this.ip= ip;
		this.port = port;
		this.requestValue = new BlockingRequestValue();
	}
	
	public BlockingRequestValue send(RequestProto msg){
		requestValue.request(msg.getRequestId());
		channel.writeAndFlush(msg);
		return requestValue;
	}
}
